{#
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
 regarding copyright ownership.  The ASF licenses this file
 to you under the Apache License, Version 2.0 (the
 "License"); you may not use this file except in compliance
 with the License.  You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing,
 software distributed under the License is distributed on an
 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
-#}
from __future__ import annotations

import {{ pickling_library }}
import sys
import os
# Setting the PYTHON_OPERATORS_VIRTUAL_ENV_MODE environment variable to 1,
# helps to avoid the issue of re creating the orm session in the settings file, otherwise
# it fails with airflow-db-not-allowed

os.environ["PYTHON_OPERATORS_VIRTUAL_ENV_MODE"] = "1"
{% if expect_airflow %}
 {# Check whether Airflow is available in the environment.
 # If it is, we'll want to ensure that we integrate any macros that are being provided
 # by plugins prior to unpickling the task context. #}
if sys.version_info >= (3,6):
    try:
        from airflow.plugins_manager import integrate_macros_plugins
        integrate_macros_plugins()
    except ImportError:
        {# Airflow is not available in this environment, therefore we won't
         # be able to integrate any plugin macros. #}
        pass
{% endif %}

try:
    from airflow.sdk.execution_time import task_runner
except ModuleNotFoundError:
    pass
else:
  {#-
    We are in an Airflow 3.x environment, try and set up supervisor comms so
    virtualenv can access Vars/Conn/XCom/etc that normal tasks can

    We don't use the walrus operator (`:=`) below as it is possible people can
    be using this on pre-3.8 versions of python, and while Airflow doesn't
    support them, it's easy to not break it not using that operator here.
  #}
  reinit_supervisor_comms = getattr(task_runner, "reinit_supervisor_comms", None)
  if reinit_supervisor_comms:
      reinit_supervisor_comms()

# Script
{{ python_callable_source }}

# monkey patching for the cases when python_callable is part of the dag module.
{% if modified_dag_module_name is defined %}

import types

{{ modified_dag_module_name }}  = types.ModuleType("{{ modified_dag_module_name }}")
{{ modified_dag_module_name }}.{{ python_callable }} = {{ python_callable }}
sys.modules["{{modified_dag_module_name}}"] = {{modified_dag_module_name}}

{%- endif -%}

{% if op_args or op_kwargs %}
with open(sys.argv[1], "rb") as file:
    arg_dict = {{ pickling_library }}.load(file)
{% else %}
arg_dict = {"args": [], "kwargs": {}}
{% endif %}

{% if string_args_global | default(true) -%}
# Read string args
with open(sys.argv[3], "r") as file:
    virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))
{% endif %}

try:
    res = {{ python_callable }}(*arg_dict["args"], **arg_dict["kwargs"])
except Exception as e:
    with open(sys.argv[4], "w") as file:
        file.write(str(e))
    raise

# Write output
with open(sys.argv[2], "wb") as file:
    if res is not None:
        {{ pickling_library }}.dump(res, file)
